package com.zhss.seckill.order;

import com.alibaba.fastjson.JSONObject;
import com.zhss.seckill.common.JedisManager;
import com.zhss.seckill.common.RedisCluster;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

@Component
public class BootListener implements CommandLineRunner {

    public static final Long ORDER_RATE_LIMIT = 500L;

    public void run(String... strings) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
                "seckill-order-consumer-group");
        consumer.setNamesrvAddr("localhost:9876");

        consumer.subscribe("flash_sale_success_inform", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messageExts,
                                                            ConsumeConcurrentlyContext context) {
                for(MessageExt messageExt : messageExts) {
                    System.out.println(new String(messageExt.getBody()));

                    // 抢购成功的通知
                    JSONObject flashSaleSuccessInform = JSONObject.parseObject(
                            new String(messageExt.getBody()));

                    // 调用订单中心提供的接口进行秒杀抢购的下单
                    Long userId = flashSaleSuccessInform.getLong("userId");
                    Long productId = flashSaleSuccessInform.getLong("productId");

                    // 对每个消息都获取一下他的发送的时间戳
                    // 然后判断一下，如果消息在MQ里积压超过了半个小时，此时就快速失败
                    // fail-fast机制，直接推送消息到MQ，通知秒杀库存服务，释放掉抢购的库存
                    // 更新抢购的状态，全部更新为抢购失败

                    // 在这里需要对下单进行限流
                    JedisManager jedisManager = JedisManager.getInstance();
                    Jedis jedis = jedisManager.getJedis();

                    Boolean orderResult = false;

                    while(!orderResult) {
                        Date now = new Date();
                        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        String currentSecond = dateFormat.format(now);

                        Long result = jedis.incr("flash_sale::order::rate_limit::" + currentSecond);
                        if(result < ORDER_RATE_LIMIT) {
                            System.out.println("调用调用订单中心提供的接口进行秒杀抢购的下单，用户id: "
                                    + userId + ", 商品id: " + productId);
                            // 如果说订单系统崩溃了
                            // 那么你的消费线程应该进入阻塞，就不要消费后面的消息了
                            // 阻塞个几分钟过后，再尝试调用订单系统去进行下单
                            orderResult = true;
                        } else {
                            // 如果当前这一秒限流了，此时休眠一秒，下一秒继续进行下单就可以了
                            try {
                                Thread.sleep(1000);
                            } catch(InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("消费者启动......");
    }

}
